# Original File: RAILS_ROOT/activesupport/lib/active_support/queueing.rb
require "delegate"
require "thread"

module Rails
  module Queue

    # A Queue that simply inherits from STDLIB's Queue. When this
    # queue is used, Rails automatically starts a job runner in a
    # background thread.
    class Queue < ::Queue
      attr_writer :consumer

      def initialize(consumer_options = {})
        super()
        @consumer_options = consumer_options
      end

      def consumer
        @consumer ||= ThreadedQueueConsumer.new(self, @consumer_options)
      end

      # Drain the queue, running all jobs in a different thread. This method
      # may not be available on production queues.
      def drain
        # run the jobs in a separate thread so assumptions of synchronous
        # jobs are caught in test mode.
        consumer.drain
      end
    end

    class SynchronousQueue < Queue
      def push(job)
        super.tap { drain }
      end
      alias <<  push
      alias enq push
    end

    # In test mode, the Rails queue is backed by an Array so that assertions
    # can be made about its contents. The test queue provides a +jobs+
    # method to make assertions about the queue's contents and a +drain+
    # method to drain the queue and run the jobs.
    #
    # Jobs are run in a separate thread to catch mistakes where code
    # assumes that the job is run in the same thread.
    class TestQueue < Queue
      # Get a list of the jobs off this queue. This method may not be
      # available on production queues.
      def jobs
        @que.dup
      end

      # Marshal and unmarshal job before pushing it onto the queue.  This will
      # raise an exception on any attempts in tests to push jobs that can't (or
      # shouldn't) be marshalled.
      def push(job)
        super Marshal.load(Marshal.dump(job))
      end
    end

    # The threaded consumer will run jobs in a background thread in
    # development mode or in a VM where running jobs on a thread in
    # production mode makes sense.
    #
    # When the process exits, the consumer pushes a nil onto the
    # queue and joins the thread, which will ensure that all jobs
    # are executed before the process finally dies.
    class ThreadedQueueConsumer
      attr_accessor :logger

      def initialize(queue, options = {})
        @queue = queue
        @logger = options[:logger]
        @fallback_logger = Logger.new($stderr)
      end

      def start
        @thread = Thread.new { consume }
        self
      end

      def shutdown
        @queue.push nil
        @thread.join
      end

      def drain
        @queue.pop.run until @queue.empty?
      end

      def consume
        while job = @queue.pop
          run job
        end
      end

      def run(job)
        job.run
      rescue Exception => exception
        handle_exception job, exception
      end

      def handle_exception(job, exception)
        (logger || @fallback_logger).error "Job Error: #{job.inspect}\n#{exception.message}\n#{exception.backtrace.join("\n")}"
      end
    end

  end
end
